Skip to content

API Reference

Node

pyodx.api.Node

A client to interact with NodeODX API.

Parameters:

Name Type Description Default
host str

Hostname or IP address of processing node

required
port int

Port of processing node

required
token str

Token to use for authentication

''
timeout int

Timeout value in seconds for network requests

30
Source code in pyodx/api.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
class Node:
    """A client to interact with NodeODX API.

        Args:
            host (str): Hostname or IP address of processing node
            port (int): Port of processing node
            token (str): Token to use for authentication
            timeout (int): Timeout value in seconds for network requests
    """
    prefixHttp = re.compile('http:', re.I)
    prefixHttps = re.compile('https:', re.I)

    presignedURL = re.compile(r"\.[a-z]*\?", re.I)

    def __init__(self, host, port, token="", timeout=30):
        self.host = host
        self.port = port
        self.token = token
        self.timeout = timeout

    @staticmethod
    def from_url(url, timeout=30):
        """Create a Node instance from a URL.

        Args:
            url (str): URL in the format proto://hostname:port/?token=value
            timeout (int): Timeout value in seconds for network requests

        Returns:
            Node (Node): Node instance

        Examples:
            >>> n = Node.from_url("http://localhost:3000?token=abc")
        """
        u = urlparse(url)
        qs = parse_qs(u.query)

        port = u.port
        if port is None:
            port = 443 if u.scheme == 'https' else 80

        token = ""
        if 'token' in qs:
            token = qs['token'][0]

        return Node(u.hostname, port, token, timeout)

    @staticmethod
    def compare_version(node_version, compare_version):
        # Compare two NodeODX versions
        # -1 = node version lower than compare
        # 0 = equal
        # 1 = node version higher than compare
        if node_version is None or len(node_version) < 3:
            return -1

        if node_version == compare_version:
            return 0

        try:
            (n_major, n_minor, n_build) = map(int, node_version.split("."))
            (c_major, c_minor, c_build) = map(int, compare_version.split("."))
        except:
            return -1

        n_number = 1000000 * n_major + 1000 * n_minor + n_build       
        c_number = 1000000 * c_major + 1000 * c_minor + c_build

        if n_number < c_number:
            return -1
        else:
            return 1

    def url(self, url, query={}):
        """Get a URL relative to this node.

        Args:
            url (str): Relative URL
            query (dict): Query values to append to the URL

        Returns:
            URL (str): Absolute URL
        """
        netloc = self.host if (self.port == 80 or self.port == 443) else "{}:{}".format(self.host, self.port)
        proto = 'https' if self.port == 443 else 'http'

        if len(self.token) > 0:
            query['token'] = self.token

        return urlunparse((proto, netloc, url, '', urlencode(query), ''))

    def get(self, url, query={}, **kwargs):
        try:
            res = requests.get(self.url(url, query), timeout=self.timeout, **kwargs)
            if res.status_code == 401:
                raise NodeResponseError("Unauthorized. Do you need to set a token?")
            elif not res.status_code in [200, 403, 206]:
                raise NodeServerError("Unexpected status code: %s" % res.status_code)

            if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
                result = res.json()
                if 'error' in result:
                    raise NodeResponseError(result['error'])
                return result
            else:
                return res
        except json.decoder.JSONDecodeError as e:
            raise NodeServerError(str(e))
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            raise NodeConnectionError(str(e))

    def post(self, url, data=None, headers={}):
        try:
            res = requests.post(self.url(url), data=data, headers=headers, timeout=self.timeout)

            if res.status_code == 401:
                raise NodeResponseError("Unauthorized. Do you need to set a token?")
            elif res.status_code != 200 and res.status_code != 403:
                raise NodeServerError(res.status_code)

            if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
                result = res.json()
                if 'error' in result:
                    raise NodeResponseError(result['error'])
                return result
            else:
                return res
        except json.decoder.JSONDecodeError as e:
            raise NodeServerError(str(e))
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            raise NodeConnectionError(str(e))


    def info(self):
        """Retrieve information about this node.

        Returns:
            NodeInfo (NodeInfo): Node information

        Examples:
            >>> n = Node('localhost', 3000)
            >>> n.info().version
            '2.3.1'
            >>> n.info().engine
            'odx'
        """
        return NodeInfo(self.get('/info'))

    def options(self):
        """Retrieve the options available for creating new tasks on this node.

        Returns:
            Options (list[NodeOption]): Available options

        Examples:
            >>> n = Node('localhost', 3000)
            >>> n.options()[0].name
            'end-with'
        """
        return list(map(lambda o: NodeOption(**o), self.get('/options')))

    def version_greater_or_equal_than(self, version):
        """Checks whether this node version is greater than or equal than
        a certain version number.

        Args:
            version (str): Version number to compare

        Returns:
            result (bool): True if node's version if >= version

        Examples:
            >>> n = Node('localhost', 3000)
            >>> n.version_greater_or_equal_than('1.3.1')
            True
            >>> n.version_greater_or_equal_than('10.5.1')
            False
        """

        node_version = self.info().version
        return self.compare_version(node_version, version) >= 0


    def create_task(self, files, options={}, name=None, progress_callback=None, skip_post_processing=False, webhook=None, outputs=[], parallel_uploads=10, max_retries=5, retry_timeout=5, task_uuid=None):
        """Start processing a new task.
        At a minimum you need to pass a list of image paths. All other parameters are optional.

        Args:
            files (list): List of image paths + optional GCP file path.
            options (dict): Options to use, for example {'orthophoto-resolution': 3, ...}
            name (str): Name for the task
            progress_callback (function): Callback reporting upload progress percentage
            skip_post_processing  (bool): When true, skips generation of map tiles, derivate assets, point cloud tiles.
            webhook (str): Optional URL to call when processing has ended (either successfully or unsuccessfully).
            outputs (list): Optional paths relative to the project directory that should be included in the all.zip result file, overriding the default behavior.
            parallel_uploads (int): Number of parallel uploads.
            max_retries (int): Number of attempts to make before giving up on a file upload.
            retry_timeout (int): Wait at least these many seconds before attempting to upload a file a second time, multiplied by the retry number.
            task_uuid (str): An optional UUID string that will be used as UUID for this task instead of generating a random one.

        Returns:
            Task (Task): The created task

        Examples:
            >>> n = Node('localhost', 3000)
            >>> t = n.create_task(['examples/images/image_1.jpg', 'examples/images/image_2.jpg'],
            ...                   {'orthophoto-resolution': 2, 'dsm': True})
            >>> info = t.info()
            >>> info.status
            <TaskStatus.RUNNING: 20>
            >>> info.last_error
            ''
            >>> t.info().images_count
            2
            >>> t.output()[0:2]
            ['DJI_0131.JPG - DJI_0313.JPG has 1 candidate matches', 'DJI_0131.JPG - DJI_0177.JPG has 3 candidate matches']
        """
        if not self.version_greater_or_equal_than("1.4.0"):
            return self.create_task_fallback(files, options, name, progress_callback, task_uuid)

        if len(files) == 0:
            raise NodeResponseError("Not enough images")

        fields = {
            'name': name,
            'options': options_to_json(options),
        }

        if skip_post_processing:
            fields['skipPostProcessing'] = 'true'

        if webhook is not None:
            fields['webhook'] = webhook

        if outputs:
            fields['outputs'] = json.dumps(outputs)

        e = MultipartEncoder(fields=fields)
        headers = {'Content-Type': e.content_type}
        if task_uuid is not None:
            headers['set-uuid'] = task_uuid

        result = self.post('/task/new/init', data=e, headers=headers)
        if isinstance(result, dict) and 'error' in result:
            raise NodeResponseError(result['error'])

        if isinstance(result, dict) and 'uuid' in result:
            uuid = result['uuid']
            progress_event = None

            class nonloc:
                uploaded_files = AtomicCounter(0)
                error = None

            # Equivalent as passing the open file descriptor, since requests
            # eventually calls read(), but this way we make sure to close
            # the file prior to reading the next, so we don't run into open file OS limits
            def read_file(file_path):
                if Node.prefixHttp.match(file_path) or Node.prefixHttps.match(file_path):
                    return requests.get(file_path).content
                else:
                    with open(file_path, 'rb') as f:
                        return f.read()

            # Upload
            def worker():
                while True:
                    task = q.get()
                    if task is None or nonloc.error is not None:
                        q.task_done()
                        break

                    # Upload file
                    if task['wait_until'] > datetime.datetime.now():
                        time.sleep((task['wait_until'] - datetime.datetime.now()).seconds)

                    # Assign value to result, prevent UnboundLocalError
                    result = None
                    try:
                        file = task['file']
                        # Check if the object url contains an access token
                        if Node.presignedURL.search(file) is not None:
                            fields = {
                                'images': [(os.path.basename(file.split("?")[0]), read_file(file), (mimetypes.guess_type(file)[0] or "image/jpg"))]
                            }
                        else:
                            fields = {
                                'images': [(os.path.basename(file), read_file(file), (mimetypes.guess_type(file)[0] or "image/jpg"))]
                            }

                        e = MultipartEncoder(fields=fields)
                        result = self.post('/task/new/upload/{}'.format(uuid), data=e, headers={'Content-Type': e.content_type})

                        if isinstance(result, dict) and 'success' in result and result['success']:
                            uf = nonloc.uploaded_files.increment()
                            if progress_event is not None:
                                progress_event.set()
                        else:
                            if isinstance(result, dict) and 'error' in result:
                                raise NodeResponseError(result['error'])
                            else:
                                raise NodeServerError("Failed upload with unexpected result: %s" % str(result))
                    except GenericError as e:
                        if task['retries'] < max_retries and not (isinstance(result, dict) and 'noRetry' in result and result['noRetry']):
                            # Put task back in queue
                            task['retries'] += 1
                            task['wait_until'] = datetime.datetime.now() + datetime.timedelta(seconds=task['retries'] * retry_timeout)
                            q.put(task)
                        else:
                            nonloc.error = e
                    except Exception as e:
                        nonloc.error = e
                    finally:
                        q.task_done()


            q = queue.Queue()
            threads = []
            for i in range(parallel_uploads):
                t = threading.Thread(target=worker)
                t.start()
                threads.append(t)

            if progress_callback is not None:
                progress_event = threading.Event()

            now = datetime.datetime.now()
            for file in files:
                q.put({
                    'file': file,
                    'wait_until': now,
                    'retries': 0
                })

            # Wait for progress updates
            if progress_event is not None:
                current_progress = 0
                while not q.empty():
                    if progress_event.wait(0.1):
                        progress_event.clear()
                        current_progress = 100.0 * nonloc.uploaded_files.value / len(files)
                        try:
                            progress_callback(current_progress)
                        except Exception as e:
                            nonloc.error = e
                    if nonloc.error is not None:
                        break

                # Make sure to report 100% complete
                if current_progress != 100 and nonloc.error is None:
                    try:
                        progress_callback(100.0)
                    except Exception as e:
                        nonloc.error = e

            # block until all tasks are done
            if nonloc.error is None:
                q.join()

            # stop workers
            for i in range(parallel_uploads):
                q.put(None)
            for t in threads:
                t.join()

            if nonloc.error is not None:
                raise nonloc.error

            result = self.post('/task/new/commit/{}'.format(uuid))
            return self.handle_task_new_response(result)
        else:
            raise NodeServerError("Invalid response from /task/new/init: %s" % result)

    def create_task_fallback(self, files, options={}, name=None, progress_callback=None, task_uuid=None):
        # Pre chunked API create task implementation, used as fallback
        if len(files) == 0:
            raise NodeResponseError("Not enough images")

        # Equivalent as passing the open file descriptor, since requests
        # eventually calls read(), but this way we make sure to close
        # the file prior to reading the next, so we don't run into open file OS limits
        def read_file(file_path):
            if Node.prefixHttp.match(file_path) or Node.prefixHttps.match(file_path):
                return requests.get(file_path).content
            else:
                with open(file_path, 'rb') as f:
                    return f.read()

        fields = {
            'name': name,
            'options': options_to_json(options),
            'images': [(os.path.basename(f), read_file(f), (mimetypes.guess_type(f)[0] or "image/jpg")) for
                       f in files]
        }

        def create_callback(mpe):
            total_bytes = mpe.len

            def callback(monitor):
                if progress_callback is not None and total_bytes > 0:
                    progress_callback(100.0 * monitor.bytes_read / total_bytes)

            return callback

        e = MultipartEncoder(fields=fields)
        m = encoder.MultipartEncoderMonitor(e, create_callback(e))

        headers = {'Content-Type': m.content_type}
        if task_uuid is not None:
            headers['set-uuid'] = task_uuid

        result = self.post('/task/new', data=m, headers=headers)

        return self.handle_task_new_response(result)

    def handle_task_new_response(self, result):
        if isinstance(result, dict) and 'uuid' in result:
            return Task(self, result['uuid'])
        elif isinstance(result, dict) and 'error' in result:
            raise NodeResponseError(result['error'])
        else:
            raise NodeServerError('Invalid response: ' + str(result))

    def get_task(self, uuid):
        """Helper method to initialize a task from an existing UUID

        Args:
            uuid (str): Unique identifier of the task

        Returns:
            Task (Task): The task instance

        Examples:
            >>> n = Node("localhost", 3000)
            >>> t = n.get_task('00000000-0000-0000-0000-000000000000')
            >>> t.__class__
            <class 'pyodx.api.Task'>
        """
        return Task(self, uuid)

__init__(host, port, token='', timeout=30)

Source code in pyodx/api.py
def __init__(self, host, port, token="", timeout=30):
    self.host = host
    self.port = port
    self.token = token
    self.timeout = timeout

from_url(url, timeout=30) staticmethod

Create a Node instance from a URL.

Parameters:

Name Type Description Default
url str

URL in the format proto://hostname:port/?token=value

required
timeout int

Timeout value in seconds for network requests

30

Returns:

Name Type Description
Node Node

Node instance

Examples:

>>> n = Node.from_url("http://localhost:3000?token=abc")
Source code in pyodx/api.py
@staticmethod
def from_url(url, timeout=30):
    """Create a Node instance from a URL.

    Args:
        url (str): URL in the format proto://hostname:port/?token=value
        timeout (int): Timeout value in seconds for network requests

    Returns:
        Node (Node): Node instance

    Examples:
        >>> n = Node.from_url("http://localhost:3000?token=abc")
    """
    u = urlparse(url)
    qs = parse_qs(u.query)

    port = u.port
    if port is None:
        port = 443 if u.scheme == 'https' else 80

    token = ""
    if 'token' in qs:
        token = qs['token'][0]

    return Node(u.hostname, port, token, timeout)

info()

Retrieve information about this node.

Returns:

Name Type Description
NodeInfo NodeInfo

Node information

Examples:

>>> n = Node('localhost', 3000)
>>> n.info().version
'2.3.1'
>>> n.info().engine
'odx'
Source code in pyodx/api.py
def info(self):
    """Retrieve information about this node.

    Returns:
        NodeInfo (NodeInfo): Node information

    Examples:
        >>> n = Node('localhost', 3000)
        >>> n.info().version
        '2.3.1'
        >>> n.info().engine
        'odx'
    """
    return NodeInfo(self.get('/info'))

options()

Retrieve the options available for creating new tasks on this node.

Returns:

Name Type Description
Options list[NodeOption]

Available options

Examples:

>>> n = Node('localhost', 3000)
>>> n.options()[0].name
'end-with'
Source code in pyodx/api.py
def options(self):
    """Retrieve the options available for creating new tasks on this node.

    Returns:
        Options (list[NodeOption]): Available options

    Examples:
        >>> n = Node('localhost', 3000)
        >>> n.options()[0].name
        'end-with'
    """
    return list(map(lambda o: NodeOption(**o), self.get('/options')))

version_greater_or_equal_than(version)

Checks whether this node version is greater than or equal than a certain version number.

Parameters:

Name Type Description Default
version str

Version number to compare

required

Returns:

Name Type Description
result bool

True if node's version if >= version

Examples:

>>> n = Node('localhost', 3000)
>>> n.version_greater_or_equal_than('1.3.1')
True
>>> n.version_greater_or_equal_than('10.5.1')
False
Source code in pyodx/api.py
def version_greater_or_equal_than(self, version):
    """Checks whether this node version is greater than or equal than
    a certain version number.

    Args:
        version (str): Version number to compare

    Returns:
        result (bool): True if node's version if >= version

    Examples:
        >>> n = Node('localhost', 3000)
        >>> n.version_greater_or_equal_than('1.3.1')
        True
        >>> n.version_greater_or_equal_than('10.5.1')
        False
    """

    node_version = self.info().version
    return self.compare_version(node_version, version) >= 0

create_task(files, options={}, name=None, progress_callback=None, skip_post_processing=False, webhook=None, outputs=[], parallel_uploads=10, max_retries=5, retry_timeout=5, task_uuid=None)

Start processing a new task. At a minimum you need to pass a list of image paths. All other parameters are optional.

Parameters:

Name Type Description Default
files list

List of image paths + optional GCP file path.

required
options dict

Options to use, for example {'orthophoto-resolution': 3, ...}

{}
name str

Name for the task

None
progress_callback function

Callback reporting upload progress percentage

None
skip_post_processing bool

When true, skips generation of map tiles, derivate assets, point cloud tiles.

False
webhook str

Optional URL to call when processing has ended (either successfully or unsuccessfully).

None
outputs list

Optional paths relative to the project directory that should be included in the all.zip result file, overriding the default behavior.

[]
parallel_uploads int

Number of parallel uploads.

10
max_retries int

Number of attempts to make before giving up on a file upload.

5
retry_timeout int

Wait at least these many seconds before attempting to upload a file a second time, multiplied by the retry number.

5
task_uuid str

An optional UUID string that will be used as UUID for this task instead of generating a random one.

None

Returns:

Name Type Description
Task Task

The created task

Examples:

>>> n = Node('localhost', 3000)
>>> t = n.create_task(['examples/images/image_1.jpg', 'examples/images/image_2.jpg'],
...                   {'orthophoto-resolution': 2, 'dsm': True})
>>> info = t.info()
>>> info.status
<TaskStatus.RUNNING: 20>
>>> info.last_error
''
>>> t.info().images_count
2
>>> t.output()[0:2]
['DJI_0131.JPG - DJI_0313.JPG has 1 candidate matches', 'DJI_0131.JPG - DJI_0177.JPG has 3 candidate matches']
Source code in pyodx/api.py
def create_task(self, files, options={}, name=None, progress_callback=None, skip_post_processing=False, webhook=None, outputs=[], parallel_uploads=10, max_retries=5, retry_timeout=5, task_uuid=None):
    """Start processing a new task.
    At a minimum you need to pass a list of image paths. All other parameters are optional.

    Args:
        files (list): List of image paths + optional GCP file path.
        options (dict): Options to use, for example {'orthophoto-resolution': 3, ...}
        name (str): Name for the task
        progress_callback (function): Callback reporting upload progress percentage
        skip_post_processing  (bool): When true, skips generation of map tiles, derivate assets, point cloud tiles.
        webhook (str): Optional URL to call when processing has ended (either successfully or unsuccessfully).
        outputs (list): Optional paths relative to the project directory that should be included in the all.zip result file, overriding the default behavior.
        parallel_uploads (int): Number of parallel uploads.
        max_retries (int): Number of attempts to make before giving up on a file upload.
        retry_timeout (int): Wait at least these many seconds before attempting to upload a file a second time, multiplied by the retry number.
        task_uuid (str): An optional UUID string that will be used as UUID for this task instead of generating a random one.

    Returns:
        Task (Task): The created task

    Examples:
        >>> n = Node('localhost', 3000)
        >>> t = n.create_task(['examples/images/image_1.jpg', 'examples/images/image_2.jpg'],
        ...                   {'orthophoto-resolution': 2, 'dsm': True})
        >>> info = t.info()
        >>> info.status
        <TaskStatus.RUNNING: 20>
        >>> info.last_error
        ''
        >>> t.info().images_count
        2
        >>> t.output()[0:2]
        ['DJI_0131.JPG - DJI_0313.JPG has 1 candidate matches', 'DJI_0131.JPG - DJI_0177.JPG has 3 candidate matches']
    """
    if not self.version_greater_or_equal_than("1.4.0"):
        return self.create_task_fallback(files, options, name, progress_callback, task_uuid)

    if len(files) == 0:
        raise NodeResponseError("Not enough images")

    fields = {
        'name': name,
        'options': options_to_json(options),
    }

    if skip_post_processing:
        fields['skipPostProcessing'] = 'true'

    if webhook is not None:
        fields['webhook'] = webhook

    if outputs:
        fields['outputs'] = json.dumps(outputs)

    e = MultipartEncoder(fields=fields)
    headers = {'Content-Type': e.content_type}
    if task_uuid is not None:
        headers['set-uuid'] = task_uuid

    result = self.post('/task/new/init', data=e, headers=headers)
    if isinstance(result, dict) and 'error' in result:
        raise NodeResponseError(result['error'])

    if isinstance(result, dict) and 'uuid' in result:
        uuid = result['uuid']
        progress_event = None

        class nonloc:
            uploaded_files = AtomicCounter(0)
            error = None

        # Equivalent as passing the open file descriptor, since requests
        # eventually calls read(), but this way we make sure to close
        # the file prior to reading the next, so we don't run into open file OS limits
        def read_file(file_path):
            if Node.prefixHttp.match(file_path) or Node.prefixHttps.match(file_path):
                return requests.get(file_path).content
            else:
                with open(file_path, 'rb') as f:
                    return f.read()

        # Upload
        def worker():
            while True:
                task = q.get()
                if task is None or nonloc.error is not None:
                    q.task_done()
                    break

                # Upload file
                if task['wait_until'] > datetime.datetime.now():
                    time.sleep((task['wait_until'] - datetime.datetime.now()).seconds)

                # Assign value to result, prevent UnboundLocalError
                result = None
                try:
                    file = task['file']
                    # Check if the object url contains an access token
                    if Node.presignedURL.search(file) is not None:
                        fields = {
                            'images': [(os.path.basename(file.split("?")[0]), read_file(file), (mimetypes.guess_type(file)[0] or "image/jpg"))]
                        }
                    else:
                        fields = {
                            'images': [(os.path.basename(file), read_file(file), (mimetypes.guess_type(file)[0] or "image/jpg"))]
                        }

                    e = MultipartEncoder(fields=fields)
                    result = self.post('/task/new/upload/{}'.format(uuid), data=e, headers={'Content-Type': e.content_type})

                    if isinstance(result, dict) and 'success' in result and result['success']:
                        uf = nonloc.uploaded_files.increment()
                        if progress_event is not None:
                            progress_event.set()
                    else:
                        if isinstance(result, dict) and 'error' in result:
                            raise NodeResponseError(result['error'])
                        else:
                            raise NodeServerError("Failed upload with unexpected result: %s" % str(result))
                except GenericError as e:
                    if task['retries'] < max_retries and not (isinstance(result, dict) and 'noRetry' in result and result['noRetry']):
                        # Put task back in queue
                        task['retries'] += 1
                        task['wait_until'] = datetime.datetime.now() + datetime.timedelta(seconds=task['retries'] * retry_timeout)
                        q.put(task)
                    else:
                        nonloc.error = e
                except Exception as e:
                    nonloc.error = e
                finally:
                    q.task_done()


        q = queue.Queue()
        threads = []
        for i in range(parallel_uploads):
            t = threading.Thread(target=worker)
            t.start()
            threads.append(t)

        if progress_callback is not None:
            progress_event = threading.Event()

        now = datetime.datetime.now()
        for file in files:
            q.put({
                'file': file,
                'wait_until': now,
                'retries': 0
            })

        # Wait for progress updates
        if progress_event is not None:
            current_progress = 0
            while not q.empty():
                if progress_event.wait(0.1):
                    progress_event.clear()
                    current_progress = 100.0 * nonloc.uploaded_files.value / len(files)
                    try:
                        progress_callback(current_progress)
                    except Exception as e:
                        nonloc.error = e
                if nonloc.error is not None:
                    break

            # Make sure to report 100% complete
            if current_progress != 100 and nonloc.error is None:
                try:
                    progress_callback(100.0)
                except Exception as e:
                    nonloc.error = e

        # block until all tasks are done
        if nonloc.error is None:
            q.join()

        # stop workers
        for i in range(parallel_uploads):
            q.put(None)
        for t in threads:
            t.join()

        if nonloc.error is not None:
            raise nonloc.error

        result = self.post('/task/new/commit/{}'.format(uuid))
        return self.handle_task_new_response(result)
    else:
        raise NodeServerError("Invalid response from /task/new/init: %s" % result)

get_task(uuid)

Helper method to initialize a task from an existing UUID

Parameters:

Name Type Description Default
uuid str

Unique identifier of the task

required

Returns:

Name Type Description
Task Task

The task instance

Examples:

>>> n = Node("localhost", 3000)
>>> t = n.get_task('00000000-0000-0000-0000-000000000000')
>>> t.__class__
<class 'pyodx.api.Task'>
Source code in pyodx/api.py
def get_task(self, uuid):
    """Helper method to initialize a task from an existing UUID

    Args:
        uuid (str): Unique identifier of the task

    Returns:
        Task (Task): The task instance

    Examples:
        >>> n = Node("localhost", 3000)
        >>> t = n.get_task('00000000-0000-0000-0000-000000000000')
        >>> t.__class__
        <class 'pyodx.api.Task'>
    """
    return Task(self, uuid)

url(url, query={})

Get a URL relative to this node.

Parameters:

Name Type Description Default
url str

Relative URL

required
query dict

Query values to append to the URL

{}

Returns:

Name Type Description
URL str

Absolute URL

Source code in pyodx/api.py
def url(self, url, query={}):
    """Get a URL relative to this node.

    Args:
        url (str): Relative URL
        query (dict): Query values to append to the URL

    Returns:
        URL (str): Absolute URL
    """
    netloc = self.host if (self.port == 80 or self.port == 443) else "{}:{}".format(self.host, self.port)
    proto = 'https' if self.port == 443 else 'http'

    if len(self.token) > 0:
        query['token'] = self.token

    return urlunparse((proto, netloc, url, '', urlencode(query), ''))

Task

pyodx.api.Task

A task is created to process images. To create a task, use Node.create_task.

Parameters:

Name Type Description Default
node Node

Node this task belongs to

required
uuid str

Unique identifier assigned to this task.

required
Source code in pyodx/api.py
class Task:
    """A task is created to process images. To create a task, use `Node.create_task`.

    Args:
        node (Node): Node this task belongs to
        uuid (str): Unique identifier assigned to this task.
    """

    def __init__(self, node, uuid):
        self.node = node
        self.uuid = uuid


    def get(self, url, query = {}, **kwargs):
        result = self.node.get(url, query, **kwargs)
        if isinstance(result, dict) and 'error' in result:
            raise NodeResponseError(result['error'])
        return result

    def post(self, url, data):
        result = self.node.post(url, data)
        if isinstance(result, dict) and 'error' in result:
            raise NodeResponseError(result['error'])
        return result

    def info(self, with_output=None):
        """Retrieves information about this task.

        Returns:
            TaskInfo (TaskInfo): Task information
        """
        query = {}
        if with_output is not None:
            query['with_output'] = with_output

        return TaskInfo(self.get('/task/{}/info'.format(self.uuid), query))

    def output(self, line=0):
        """Retrieve console task output.

        Args:
            line (int): Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Negative numbers are also allowed. For example -50 will retrieve the last 50 lines of console output. Defaults to 0 (retrieve all console output).

        Returns:
            output (list[str]): Console output (one list item per row).
        """
        return self.get('/task/{}/output'.format(self.uuid), {'line': line})

    def cancel(self):
        """Cancel this task.

        Returns:
            success (bool): Task was canceled or not
        """
        return self.post('/task/cancel', {'uuid': self.uuid}).get('success', False)

    def remove(self):
        """Remove this task.

        Returns:
            success (bool): Task was removed or not
        """
        return self.post('/task/remove', {'uuid': self.uuid}).get('success', False)

    def restart(self, options=None):
        """Restart this task.

        Args:
            options (dict): Options to use, for example {'orthophoto-resolution': 3, ...}

        Returns:
            success (bool): Task was restarted or not
        """
        data = {'uuid': self.uuid}
        if options is not None: data['options'] = options_to_json(options)
        return self.post('/task/restart', data).get('success', False)

    def download_zip(self, destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10):
        """Download this task's assets archive to a directory.

        Args:
            destination (str): Directory where to download assets archive. If the directory does not exist, it will be created.
            progress_callback (function): Optional callback with one parameter, the download progress percentage.
            parallel_downloads (int): Maximum number of parallel downloads if the node supports http range.
            parallel_chunks_size (int): Size in MB of chunks for parallel downloads
        Returns:
            Path (str): Path to archive file (.zip)
        """
        info = self.info()
        if info.status != TaskStatus.COMPLETED:
            raise NodeResponseError("Cannot download task, task status is " + str(info.status))

        if not os.path.exists(destination):
            os.makedirs(destination, exist_ok=True)

        try:
            download_stream = self.get('/task/{}/download/all.zip'.format(self.uuid), stream=True)
            headers = download_stream.headers

            zip_path = os.path.join(destination, "{}_{}_all.zip".format(self.uuid, int(time.time())))

            # Keep track of download progress (if possible)
            content_length = download_stream.headers.get('content-length')
            total_length = int(content_length) if content_length is not None else None
            downloaded = 0
            chunk_size = int(parallel_chunks_size * 1024 * 1024)
            use_fallback = False
            accept_ranges = headers.get('accept-ranges')

            # Can we do parallel downloads?
            if accept_ranges is not None and accept_ranges.lower() == 'bytes' and total_length is not None and total_length > chunk_size and parallel_downloads > 1:
                num_chunks = int(math.ceil(total_length / float(chunk_size)))
                num_workers = parallel_downloads

                class nonloc:
                    completed_chunks = AtomicCounter(0)
                    merge_chunks = [False] * num_chunks
                    error = None

                def merge():
                    current_chunk = 0

                    with open(zip_path, "wb") as out_file:
                        while current_chunk < num_chunks and nonloc.error is None:
                            if nonloc.merge_chunks[current_chunk]:
                                chunk_file = "%s.part%s" % (zip_path, current_chunk)
                                with open(chunk_file, "rb") as fd:
                                    out_file.write(fd.read())

                                os.unlink(chunk_file)

                                current_chunk += 1
                            else:
                                time.sleep(0.1)

                def worker():
                    while True:
                        task = q.get()
                        part_num, bytes_range = task
                        if bytes_range is None or nonloc.error is not None:
                            q.task_done()
                            break

                        try:
                            # Download chunk
                            res = self.get('/task/{}/download/all.zip'.format(self.uuid), stream=True, headers={'Range': 'bytes=%s-%s' % bytes_range})
                            if res.status_code == 206:
                                with open("%s.part%s" % (zip_path, part_num), 'wb') as fd:
                                    bytes_written = 0
                                    try:
                                        for chunk in res.iter_content(4096):
                                            bytes_written += fd.write(chunk)
                                    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
                                        raise NodeConnectionError(str(e))

                                    if bytes_written != (bytes_range[1] - bytes_range[0] + 1):
                                        # Process again
                                        q.put((part_num, bytes_range))
                                        return

                                with nonloc.completed_chunks.lock:
                                    nonloc.completed_chunks.value += 1

                                    if progress_callback is not None:
                                        progress_callback(100.0 * nonloc.completed_chunks.value / num_chunks)

                                nonloc.merge_chunks[part_num] = True
                            else:
                                nonloc.error = RangeNotAvailableError()
                        except GenericError as e:
                            time.sleep(5)
                            q.put((part_num, bytes_range))
                        except Exception as e:
                            nonloc.error = e
                        finally:
                            q.task_done()

                q = queue.PriorityQueue()
                threads = []
                for i in range(num_workers):
                    t = threading.Thread(target=worker)
                    t.start()
                    threads.append(t)

                merge_thread = threading.Thread(target=merge)
                merge_thread.start()

                range_start = 0

                for i in range(num_chunks):
                    range_end = min(range_start + chunk_size - 1, total_length - 1)
                    q.put((i, (range_start, range_end)))
                    range_start = range_end + 1

                # block until all tasks are done
                while not all(nonloc.merge_chunks) and nonloc.error is None:
                    time.sleep(0.1)

                # stop workers
                for i in range(len(threads)):
                    q.put((-1, None))
                for t in threads:
                    t.join()

                merge_thread.join()

                if nonloc.error is not None:
                    if isinstance(nonloc.error, RangeNotAvailableError):
                        use_fallback = True
                    else:
                        raise nonloc.error
            else:
                use_fallback = True

            if use_fallback:
                # Single connection, boring download
                with open(zip_path, 'wb') as fd:
                    for chunk in download_stream.iter_content(4096):
                        downloaded += len(chunk)

                        if progress_callback is not None and total_length is not None:
                            progress_callback((100.0 * float(downloaded) / total_length))

                        fd.write(chunk)

        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, ReadTimeoutError) as e:
            raise NodeConnectionError(e)

        return zip_path

    def download_assets(self, destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10):
        """Download this task's assets to a directory.

        Args:
            destination (str): Directory where to download assets. If the directory does not exist, it will be created.
            progress_callback (function): Optional callback with one parameter, the download progress percentage
            parallel_downloads (int): Maximum number of parallel downloads if the node supports http range.
            parallel_chunks_size (int): Size in MB of chunks for parallel downloads
        Returns:
            Path (str): Path to saved assets
        """
        zip_path = self.download_zip(destination, progress_callback=progress_callback, parallel_downloads=parallel_downloads, parallel_chunks_size=parallel_chunks_size)
        with zipfile.ZipFile(zip_path, "r") as zip_h:
            zip_h.extractall(destination)
        os.remove(zip_path)

        return destination

    def wait_for_completion(self, status_callback=None, interval=3, max_retries=5, retry_timeout=5):
        """Wait for the task to complete. The call will block until the task status has become
        `TaskStatus.COMPLETED`. If the status is set to `TaskStatus.CANCELED` or `TaskStatus.FAILED`
        it raises a TaskFailedError exception.

        Args:
            status_callback (function): Optional callback that will be called with task info updates every interval seconds.
            interval (int): Seconds between status checks.
            max_retries (int): Number of repeated attempts that should be made to receive a status update before giving up.
            retry_timeout (int): Wait N*retry_timeout between attempts, where N is the attempt number.
        """
        retry = 0

        while True:
            try:
                info = self.info()
            except NodeConnectionError as e:
                if retry < max_retries:
                    retry += 1
                    time.sleep(retry * retry_timeout)
                    continue
                else:
                    raise e

            retry = 0
            if status_callback is not None:
                status_callback(info)

            if info.status in [TaskStatus.COMPLETED, TaskStatus.CANCELED, TaskStatus.FAILED]:
                break

            time.sleep(interval)

        if info.status in [TaskStatus.FAILED, TaskStatus.CANCELED]:
            raise TaskFailedError(info.status)

info(with_output=None)

Retrieves information about this task.

Returns:

Name Type Description
TaskInfo TaskInfo

Task information

Source code in pyodx/api.py
def info(self, with_output=None):
    """Retrieves information about this task.

    Returns:
        TaskInfo (TaskInfo): Task information
    """
    query = {}
    if with_output is not None:
        query['with_output'] = with_output

    return TaskInfo(self.get('/task/{}/info'.format(self.uuid), query))

output(line=0)

Retrieve console task output.

Parameters:

Name Type Description Default
line int

Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Negative numbers are also allowed. For example -50 will retrieve the last 50 lines of console output. Defaults to 0 (retrieve all console output).

0

Returns:

Name Type Description
output list[str]

Console output (one list item per row).

Source code in pyodx/api.py
def output(self, line=0):
    """Retrieve console task output.

    Args:
        line (int): Optional line number that the console output should be truncated from. For example, passing a value of 100 will retrieve the console output starting from line 100. Negative numbers are also allowed. For example -50 will retrieve the last 50 lines of console output. Defaults to 0 (retrieve all console output).

    Returns:
        output (list[str]): Console output (one list item per row).
    """
    return self.get('/task/{}/output'.format(self.uuid), {'line': line})

cancel()

Cancel this task.

Returns:

Name Type Description
success bool

Task was canceled or not

Source code in pyodx/api.py
def cancel(self):
    """Cancel this task.

    Returns:
        success (bool): Task was canceled or not
    """
    return self.post('/task/cancel', {'uuid': self.uuid}).get('success', False)

remove()

Remove this task.

Returns:

Name Type Description
success bool

Task was removed or not

Source code in pyodx/api.py
def remove(self):
    """Remove this task.

    Returns:
        success (bool): Task was removed or not
    """
    return self.post('/task/remove', {'uuid': self.uuid}).get('success', False)

restart(options=None)

Restart this task.

Parameters:

Name Type Description Default
options dict

Options to use, for example {'orthophoto-resolution': 3, ...}

None

Returns:

Name Type Description
success bool

Task was restarted or not

Source code in pyodx/api.py
def restart(self, options=None):
    """Restart this task.

    Args:
        options (dict): Options to use, for example {'orthophoto-resolution': 3, ...}

    Returns:
        success (bool): Task was restarted or not
    """
    data = {'uuid': self.uuid}
    if options is not None: data['options'] = options_to_json(options)
    return self.post('/task/restart', data).get('success', False)

download_zip(destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10)

Download this task's assets archive to a directory.

Parameters:

Name Type Description Default
destination str

Directory where to download assets archive. If the directory does not exist, it will be created.

required
progress_callback function

Optional callback with one parameter, the download progress percentage.

None
parallel_downloads int

Maximum number of parallel downloads if the node supports http range.

16
parallel_chunks_size int

Size in MB of chunks for parallel downloads

10

Returns: Path (str): Path to archive file (.zip)

Source code in pyodx/api.py
def download_zip(self, destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10):
    """Download this task's assets archive to a directory.

    Args:
        destination (str): Directory where to download assets archive. If the directory does not exist, it will be created.
        progress_callback (function): Optional callback with one parameter, the download progress percentage.
        parallel_downloads (int): Maximum number of parallel downloads if the node supports http range.
        parallel_chunks_size (int): Size in MB of chunks for parallel downloads
    Returns:
        Path (str): Path to archive file (.zip)
    """
    info = self.info()
    if info.status != TaskStatus.COMPLETED:
        raise NodeResponseError("Cannot download task, task status is " + str(info.status))

    if not os.path.exists(destination):
        os.makedirs(destination, exist_ok=True)

    try:
        download_stream = self.get('/task/{}/download/all.zip'.format(self.uuid), stream=True)
        headers = download_stream.headers

        zip_path = os.path.join(destination, "{}_{}_all.zip".format(self.uuid, int(time.time())))

        # Keep track of download progress (if possible)
        content_length = download_stream.headers.get('content-length')
        total_length = int(content_length) if content_length is not None else None
        downloaded = 0
        chunk_size = int(parallel_chunks_size * 1024 * 1024)
        use_fallback = False
        accept_ranges = headers.get('accept-ranges')

        # Can we do parallel downloads?
        if accept_ranges is not None and accept_ranges.lower() == 'bytes' and total_length is not None and total_length > chunk_size and parallel_downloads > 1:
            num_chunks = int(math.ceil(total_length / float(chunk_size)))
            num_workers = parallel_downloads

            class nonloc:
                completed_chunks = AtomicCounter(0)
                merge_chunks = [False] * num_chunks
                error = None

            def merge():
                current_chunk = 0

                with open(zip_path, "wb") as out_file:
                    while current_chunk < num_chunks and nonloc.error is None:
                        if nonloc.merge_chunks[current_chunk]:
                            chunk_file = "%s.part%s" % (zip_path, current_chunk)
                            with open(chunk_file, "rb") as fd:
                                out_file.write(fd.read())

                            os.unlink(chunk_file)

                            current_chunk += 1
                        else:
                            time.sleep(0.1)

            def worker():
                while True:
                    task = q.get()
                    part_num, bytes_range = task
                    if bytes_range is None or nonloc.error is not None:
                        q.task_done()
                        break

                    try:
                        # Download chunk
                        res = self.get('/task/{}/download/all.zip'.format(self.uuid), stream=True, headers={'Range': 'bytes=%s-%s' % bytes_range})
                        if res.status_code == 206:
                            with open("%s.part%s" % (zip_path, part_num), 'wb') as fd:
                                bytes_written = 0
                                try:
                                    for chunk in res.iter_content(4096):
                                        bytes_written += fd.write(chunk)
                                except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
                                    raise NodeConnectionError(str(e))

                                if bytes_written != (bytes_range[1] - bytes_range[0] + 1):
                                    # Process again
                                    q.put((part_num, bytes_range))
                                    return

                            with nonloc.completed_chunks.lock:
                                nonloc.completed_chunks.value += 1

                                if progress_callback is not None:
                                    progress_callback(100.0 * nonloc.completed_chunks.value / num_chunks)

                            nonloc.merge_chunks[part_num] = True
                        else:
                            nonloc.error = RangeNotAvailableError()
                    except GenericError as e:
                        time.sleep(5)
                        q.put((part_num, bytes_range))
                    except Exception as e:
                        nonloc.error = e
                    finally:
                        q.task_done()

            q = queue.PriorityQueue()
            threads = []
            for i in range(num_workers):
                t = threading.Thread(target=worker)
                t.start()
                threads.append(t)

            merge_thread = threading.Thread(target=merge)
            merge_thread.start()

            range_start = 0

            for i in range(num_chunks):
                range_end = min(range_start + chunk_size - 1, total_length - 1)
                q.put((i, (range_start, range_end)))
                range_start = range_end + 1

            # block until all tasks are done
            while not all(nonloc.merge_chunks) and nonloc.error is None:
                time.sleep(0.1)

            # stop workers
            for i in range(len(threads)):
                q.put((-1, None))
            for t in threads:
                t.join()

            merge_thread.join()

            if nonloc.error is not None:
                if isinstance(nonloc.error, RangeNotAvailableError):
                    use_fallback = True
                else:
                    raise nonloc.error
        else:
            use_fallback = True

        if use_fallback:
            # Single connection, boring download
            with open(zip_path, 'wb') as fd:
                for chunk in download_stream.iter_content(4096):
                    downloaded += len(chunk)

                    if progress_callback is not None and total_length is not None:
                        progress_callback((100.0 * float(downloaded) / total_length))

                    fd.write(chunk)

    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, ReadTimeoutError) as e:
        raise NodeConnectionError(e)

    return zip_path

download_assets(destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10)

Download this task's assets to a directory.

Parameters:

Name Type Description Default
destination str

Directory where to download assets. If the directory does not exist, it will be created.

required
progress_callback function

Optional callback with one parameter, the download progress percentage

None
parallel_downloads int

Maximum number of parallel downloads if the node supports http range.

16
parallel_chunks_size int

Size in MB of chunks for parallel downloads

10

Returns: Path (str): Path to saved assets

Source code in pyodx/api.py
def download_assets(self, destination, progress_callback=None, parallel_downloads=16, parallel_chunks_size=10):
    """Download this task's assets to a directory.

    Args:
        destination (str): Directory where to download assets. If the directory does not exist, it will be created.
        progress_callback (function): Optional callback with one parameter, the download progress percentage
        parallel_downloads (int): Maximum number of parallel downloads if the node supports http range.
        parallel_chunks_size (int): Size in MB of chunks for parallel downloads
    Returns:
        Path (str): Path to saved assets
    """
    zip_path = self.download_zip(destination, progress_callback=progress_callback, parallel_downloads=parallel_downloads, parallel_chunks_size=parallel_chunks_size)
    with zipfile.ZipFile(zip_path, "r") as zip_h:
        zip_h.extractall(destination)
    os.remove(zip_path)

    return destination

wait_for_completion(status_callback=None, interval=3, max_retries=5, retry_timeout=5)

Wait for the task to complete. The call will block until the task status has become TaskStatus.COMPLETED. If the status is set to TaskStatus.CANCELED or TaskStatus.FAILED it raises a TaskFailedError exception.

Parameters:

Name Type Description Default
status_callback function

Optional callback that will be called with task info updates every interval seconds.

None
interval int

Seconds between status checks.

3
max_retries int

Number of repeated attempts that should be made to receive a status update before giving up.

5
retry_timeout int

Wait N*retry_timeout between attempts, where N is the attempt number.

5
Source code in pyodx/api.py
def wait_for_completion(self, status_callback=None, interval=3, max_retries=5, retry_timeout=5):
    """Wait for the task to complete. The call will block until the task status has become
    `TaskStatus.COMPLETED`. If the status is set to `TaskStatus.CANCELED` or `TaskStatus.FAILED`
    it raises a TaskFailedError exception.

    Args:
        status_callback (function): Optional callback that will be called with task info updates every interval seconds.
        interval (int): Seconds between status checks.
        max_retries (int): Number of repeated attempts that should be made to receive a status update before giving up.
        retry_timeout (int): Wait N*retry_timeout between attempts, where N is the attempt number.
    """
    retry = 0

    while True:
        try:
            info = self.info()
        except NodeConnectionError as e:
            if retry < max_retries:
                retry += 1
                time.sleep(retry * retry_timeout)
                continue
            else:
                raise e

        retry = 0
        if status_callback is not None:
            status_callback(info)

        if info.status in [TaskStatus.COMPLETED, TaskStatus.CANCELED, TaskStatus.FAILED]:
            break

        time.sleep(interval)

    if info.status in [TaskStatus.FAILED, TaskStatus.CANCELED]:
        raise TaskFailedError(info.status)

Types

pyodx.types.NodeInfo

Bases: JsonResponse

Information about a node

Attributes:

Name Type Description
version str

Current API version

task_queue_count int

Number of tasks currently being processed or waiting to be processed

total_memory int

Amount of total RAM in the system in bytes

available_memory int

Amount of RAM available in bytes

cpu_cores int

Number of virtual CPU cores

max_images int

Maximum number of images allowed for new tasks or None if there's no limit.

max_parallel_tasks int

Maximum number of tasks that can be processed simultaneously

odm_version str

Current version of ODM (deprecated, use engine_version instead)

engine str

Lowercase identifier of the engine (odm, micmac, ...)

engine_version str

Current engine version

Source code in pyodx/types.py
class NodeInfo(JsonResponse):
    """Information about a node

    Attributes:
        version (str): Current API version
        task_queue_count (int): Number of tasks currently being processed or waiting to be processed
        total_memory (int): Amount of total RAM in the system in bytes
        available_memory (int): Amount of RAM available in bytes
        cpu_cores (int): Number of virtual CPU cores
        max_images (int): Maximum number of images allowed for new tasks or None if there's no limit.
        max_parallel_tasks (int): Maximum number of tasks that can be processed simultaneously
        odm_version (str): Current version of ODM (deprecated, use engine_version instead)
        engine (str): Lowercase identifier of the engine (odm, micmac, ...)
        engine_version (str): Current engine version
    """
    def __init__(self, json):
        self.version = json.get('version', '?')
        self.task_queue_count = json.get('taskQueueCount')
        self.total_memory = json.get('totalMemory')
        self.available_memory = json.get('availableMemory')
        self.cpu_cores = json.get('cpuCores')
        self.max_images = json.get('maxImages')
        self.max_parallel_tasks = json.get('maxParallelTasks')
        self.engine = json.get('engine', '?')
        self.engine_version = json.get('engineVersion', '?')

        # Deprecated
        self.odm_version = json.get('odmVersion', '?')

        # Guess
        if self.engine_version == '?' and self.odm_version != '?':
            self.engine = 'odm'
            self.engine_version = self.odm_version

pyodx.types.NodeOption

Bases: JsonResponse

A node option available to be passed to a node.

Parameters:

Name Type Description Default
domain str

Valid range of values

required
help str

Description of what this option does

required
name str

Option name

required
value str

Default value for this option

required
type str

One of: ['int', 'float', 'string', 'bool', 'enum']

required
Source code in pyodx/types.py
class NodeOption(JsonResponse):
    """A node option available to be passed to a node.

    Args:
        domain (str): Valid range of values
        help (str): Description of what this option does
        name (str): Option name
        value (str): Default value for this option
        type (str): One of: ['int', 'float', 'string', 'bool', 'enum']
    """
    def __init__(self, domain, help, name, value, type):
        self.domain = domain
        self.help = help
        self.name = name
        self.value = value
        self.type = type

pyodx.types.TaskInfo

Bases: JsonResponse

Task information

Attributes:

Name Type Description
uuid str

Unique identifier

name str

Human friendly name

date_created datetime

Creation date and time

processing_time int

Milliseconds that have elapsed since the start of processing, or -1 if no information is available.

status TaskStatus

Status (running, queued, etc.)

last_error str

If the task fails, this will be set to a string representing the last error that occured, otherwise it's an empty string.

options dict

Options used for this task

images_count int

Number of images (+ GCP file)

progress float

Percentage progress (estimated) of the task

output list[str]

Optional console output (one list item per row). This is populated only if the with_output parameter is passed to info().

Source code in pyodx/types.py
class TaskInfo(JsonResponse):
    """Task information

    Attributes:
        uuid (str): Unique identifier
        name (str): Human friendly name
        date_created (datetime): Creation date and time
        processing_time (int): Milliseconds that have elapsed since the start of processing, or -1 if no information is available.
        status (TaskStatus): Status (running, queued, etc.)
        last_error (str): If the task fails, this will be set to a string representing the last error that occured, otherwise it's an empty string.
        options (dict): Options used for this task
        images_count (int): Number of images (+ GCP file)
        progress (float): Percentage progress (estimated) of the task
        output (list[str]): Optional console output (one list item per row). This is populated only if the with_output parameter is passed to info().
    """
    def __init__(self, json):
        self.uuid = json['uuid']
        self.name = json['name']
        self.date_created = datetime.utcfromtimestamp(int(json['dateCreated'] / 1000.0))
        self.processing_time = json['processingTime']
        self.status = TaskStatus(json['status']['code'])
        self.last_error = json['status'].get('errorMessage', '')
        self.options = json['options']
        self.images_count = json['imagesCount']
        self.progress = json.get('progress', 0)
        self.output = json.get('output', [])

pyodx.types.TaskStatus

Bases: Enum

Task status

Attributes:

Name Type Description
QUEUED

Task's files have been uploaded and are waiting to be processed.

RUNNING

Task is currently being processed.

FAILED

Task has failed for some reason (not enough images, out of memory, etc.

COMPLETED

Task has completed. Assets are be ready to be downloaded.

CANCELED

Task was manually canceled by the user.

Source code in pyodx/types.py
class TaskStatus(Enum):
    """Task status

    Attributes:
        QUEUED: Task's files have been uploaded and are waiting to be processed.
        RUNNING: Task is currently being processed.
        FAILED:	Task has failed for some reason (not enough images, out of memory, etc.
        COMPLETED: Task has completed. Assets are be ready to be downloaded.
        CANCELED: Task was manually canceled by the user.
    """
    QUEUED = 10
    RUNNING = 20
    FAILED = 30
    COMPLETED = 40
    CANCELED = 50

Exceptions

pyodx.exceptions.GenericError

Bases: Exception

Generic catch-all exception. All exceptions in pyodx inherit from it.

Source code in pyodx/exceptions.py
1
2
3
class GenericError(Exception):
    """Generic catch-all exception. All exceptions in pyodx inherit from it."""
    pass

pyodx.exceptions.NodeServerError

Bases: GenericError

The server replied in a manner which we did not expect. Usually this indicates a temporary malfunction of the node.

Source code in pyodx/exceptions.py
5
6
7
8
class NodeServerError(GenericError):
    """The server replied in a manner which we did not expect. Usually this indicates
    a temporary malfunction of the node."""
    pass

pyodx.exceptions.NodeConnectionError

Bases: GenericError

A connection problem (such as a timeout or a network error) has occurred.

Source code in pyodx/exceptions.py
class NodeConnectionError(GenericError):
    """A connection problem (such as a timeout or a network error) has occurred."""
    pass

pyodx.exceptions.NodeResponseError

Bases: GenericError

The node responded with an error message indicating that the requested operation failed.

Source code in pyodx/exceptions.py
class NodeResponseError(GenericError):
    """The node responded with an error message indicating that the requested operation failed."""
    pass

pyodx.exceptions.TaskFailedError

Bases: GenericError

A task did not complete successfully.

Source code in pyodx/exceptions.py
class TaskFailedError(GenericError):
    """A task did not complete successfully."""
    pass

pyodx.exceptions.RangeNotAvailableError

Bases: GenericError

A download attempt to use Range requests failed.

Source code in pyodx/exceptions.py
class RangeNotAvailableError(GenericError):
    """A download attempt to use Range requests failed."""
    pass